package org.hscoder.mongo.sync;

import com.mongodb.MongoClientSettings;
import com.mongodb.ServerAddress;
import com.mongodb.reactivestreams.client.*;
import org.bson.Document;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.ArrayList;
import java.util.List;

public class AsyncMongoTest {

    private static MongoClient buildClient() {
        //服务器实例表
        List servers = new ArrayList();
        servers.add(new ServerAddress("localhost", 27018));

//配置构建器
        MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder();

//传入服务器实例
        settingsBuilder.applyToClusterSettings(
                builder -> builder.hosts(servers));

//构建 Client 实例
        MongoClient mongoClient = MongoClients.create(settingsBuilder.build());
        return mongoClient;
    }

    public static void main(String[] args) throws InterruptedException {

        MongoClient mongoClient = buildClient();
        MongoDatabase database = mongoClient.getDatabase("test");
        MongoCollection collection = database.getCollection("test");

//异步返回Publisher
        FindPublisher publisher = collection.find();

        printThread();
//订阅实现
        publisher.subscribe(new Subscriber<Document>() {
            @Override
            public void onSubscribe(Subscription s) {
                printThread();
                System.out.println("start...");
                //执行请求
                s.request(Integer.MAX_VALUE);

            }


            @Override
            public void onNext(Document document) {
                printThread();
                //获得文档
                System.out.println("Document:" + document.toJson());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            }

            @Override
            public void onError(Throwable t) {
                printThread();
                System.out.println("error occurs.");
            }

            @Override
            public void onComplete() {
                printThread();
                System.out.println("finished.");
            }
        });

        Thread.sleep(100000000);

    }

    private static void printThread(){
        System.out.println(Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
    }
}
